package com.xxl.job.core.executor;

import com.xxl.job.core.Constant;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.util.NetUtil;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 初始化核心类
 * Created by xuxueli on 2016/3/2 21:14.
 */
@Slf4j
public class XxlJobExecutor implements ApplicationContextAware {
	
	@Setter
    private String adminAddresses;
	@Setter
    private String appName;
	@Setter
	private String ip;
	@Setter
    private int port;
	@Setter
	private String accessToken;
	@Setter
	private String logPath;
	@Setter
	private int logRetentionDays;
	

	
    //启动+停机
    public void start() throws Exception {
        // init admin-client ，准备与多个调度中心通信Client
        initAdminBizList(adminAddresses,accessToken);

        // init jobName - jobBean，扫描ApplicationContext的Job
        initJobHandlerRepository(applicationContext);

        // init logpath，初始化log路径
        XxlJobFileAppender.initLogPath(logPath);

        // init executor-server，启动agent jetty server
        initExecutorServer(port,ip,appName, accessToken);

        // init JobLogFileCleanThread，启动清除日志的job
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
    }
    public void destroy(){
        // destory JobThreadRepository
        if (JobThreadRepository.size() > 0) {
            for (Map.Entry<Integer, JobThread> item: JobThreadRepository.entrySet()) {
                removeJobThread(item.getKey(), "Web容器销毁终止");
            }
            JobThreadRepository.clear();
        }

        // destory executor-server
        stopExecutorServer();

        // destory JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().toStop();
    }


    
    //Spring applicationContext 注入
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    	XxlJobExecutor.applicationContext = applicationContext;
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
    
    
    
    //主动与多个调度中心，通信Client
    private static List<AdminBiz> adminBizs = new ArrayList<AdminBiz>();
    private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    	
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {	
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
                	// http://127.0.0.1:8080/xxl-job-admin/api
                    String addressUrl = address.concat(Constant.ADMIN_BIZ_MAPPING);
                    
                    AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
                    adminBizs.add(adminBiz);
                    log.info("构建到调度中心的Client, addressUrl : {}, accessToken: {}",addressUrl, accessToken);
                }
            }
        }
    }
    public static List<AdminBiz> getAdminBizs(){
        return adminBizs;
    }

    
    
    //job handler repository， Job名称对应实体类
    private static Map<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
   
    public static IJobHandler loadJobHandler(String name){
        return jobHandlerRepository.get(name);
    }
    private static void initJobHandlerRepository(ApplicationContext applicationContext){
        if (applicationContext == null) {
            return;
        }

        // init job handler action
        Map<String,Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
        if(serviceBeanMap==null || serviceBeanMap.size() == 0) {
        	return;
        }
       
        for (Object serviceBean : serviceBeanMap.values()) {
            if(serviceBean instanceof IJobHandler){
                String jobName = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                IJobHandler jobHandler = (IJobHandler) serviceBean;
                if (jobHandlerRepository.get(jobName) != null) {
                    throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                }
                
                log.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", jobName, jobHandler.getClass().getName());
                jobHandlerRepository.put(jobName, jobHandler);
            }
        }
    }
    
    //服务节点和执行器注册中心的心跳，也在这里
    // ---------------------- executor-server(jetty) ----------------------
    private NetComServerFactory serverFactory = new NetComServerFactory();
    private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
        // valid param
        port = port>0?port: NetUtil.findAvailablePort(9999);

        //rpc-service, base on jetty
        NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
        NetComServerFactory.setAccessToken(accessToken);
        
        // start server
        serverFactory.start(port,ip,appName); // jetty + registry
    }
    private void stopExecutorServer() {
        serverFactory.destroy();    // jetty + registry + callback
    }


    


    //JobId来自服务端
    // ---------------------- job thread repository ----------------------
    private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
    public static JobThread registerJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        log.info(">>>>>>>>>>> xxl-job register JobThread success, jobId:{}, handler:{}",jobId, handler.getClass().getName());

        
    	// putIfAbsent | oh my god, map's put method return the old value!!!
        JobThread oldJobThread = JobThreadRepository.put(jobId, newJobThread);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }
    public static void removeJobThread(int jobId, String removeOldReason) {
        JobThread oldJobThread = JobThreadRepository.remove(jobId);
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }
    }
    public static JobThread loadJobThread(int jobId) {
        JobThread jobThread = JobThreadRepository.get(jobId);
        return jobThread;
    }
}